[Step Functions]動的並列処理(Map)を使って60分×24時間=1440ファイルのバッチ処理を楽々実装してみた
こんにちは、平野です。
下記のブログで紹介されているように、
Step Functionsにて、配列の入力を個別のLambda等にバラまいて処理させるMap
ステートがサポートされました!!
担当していた案件で、S3上にある直近24時間分ファイル群(各ファイル名に秒までの時刻が入っている)を、 1分毎にまとめて別のバケットに移すような処理があり、 これはまさにMapステートに最適な素材でしたので、Mapステートを使ったリファクタリングをしてみました!
この記事では、ServerlessFrameworkのStep Functionsプラグインを用いています。 (対応早くて助かる!!)
検証バージョン
ServerlessFramework: 1.40.0
Step Functionsプラグイン: 2.8.0
Mapステートの簡単なおさらい
Mapステートはちょっと使うだけなら仕組みはとっても簡単なので、 まずはビジュアルワークフローで挙動を確認しました。
MakeArrayState
で[1, 2, 3, 4, 5]
という配列を出力して
CalcSquareState
(Mapステート)に配列が入力されて
その中にある個々のCalcSquare
に配列の個々の値が入力され、2乗した値を出力
配列内の各々の値が2乗された配列が、CalcSquareState
の出力になっています。
ちなみに書くまでもないくらいですが、それぞれLambda関数は至極簡単です。
make_array.py
def lambda_handler(event, context): return [1, 2, 3, 4, 5]
calc_square.py
def lambda_handler(event, context): return event*event
つまり、Pythonで言えば次のようなmap関数などと同じ感じで実装が可能になります。
array = [1, 2, 3, 4, 5] squared = list(map(lambda x: x*x, array)) # ==> [1, 4, 9, 16, 25]
serverless.yml
上記を行うためのserverless.yml
とPythonスクリプトの例です。
service: hirano-stepfunctions-map-test-01 # NOTE: update this with your service name provider: name: aws runtime: python3.7 stage: ${env:STAGE, 'dev'} region: ap-northeast-1 functions: MakeArrayLambda: handler: handler/make_array.lambda_handler timeout: 30 CalcSquareLambda: handler: handler/calc_square.lambda_handler timeout: 30 stepFunctions: stateMachines: StateMachine01: definition: StartAt: MakeArrayState States: MakeArrayState: Type: Task Resource: Fn::GetAtt: [MakeArrayLambda, Arn] Next: CalcSquareState CalcSquareState: Type: Map Iterator: StartAt: CalcSquare States: CalcSquare: Type: Task Resource: Fn::GetAtt: [CalcSquareLambda, Arn] End: true End: true plugins: - serverless-step-functions package: exclude: - .git/** - node_modules - node_modules/** - __pycache__
大量のファイルに同一処理させる例
本題。
毎分ごとのファイルに統合するので、60分 * 24時間 = 1440個
のファイルを処理する仕組みを作りました。
CloudWatch Eventsからスケジュール起動され、その日付に対応した(前日の)24時間を対象とするような想定です。
挙動の確認のところで、Mapステートの(一番安直な)使い方はわかっているので、 特に解説するところもなく、それにしたがって実装するだけです。
serverless.yml
並列数が1440であり、全く別の処理のためのLambda関数も動いているので、
同時実行数(MaxConcurrency
)は100に制限しています。
service: cm-hirano-stepfunctions-map-test # NOTE: update this with your service name provider: name: aws runtime: python3.7 stage: ${env:STAGE, 'dev'} region: ap-northeast-1 memorySize: 256 iamRoleStatements: - Effect: "Allow" Action: - "s3:*" Resource: - "*" environment: STAGE: ${self:provider.stage} tracing: lambda: true functions: MakeEveryMinute: handler: handler/make_every_minute.lambda_handler timeout: 30 Concatenate: handler: handler/concatenate.lambda_handler timeout: 30 stepFunctions: stateMachines: StateMachine01: definition: StartAt: MakeEveryMinuteState States: MakeEveryMinuteState: Type: Task Resource: Fn::GetAtt: [MakeEveryMinuteLambdaFunction, Arn] Next: ConcatenateMapState ConcatenateMapState: Type: Map MaxConcurrency: 100 Iterator: StartAt: ConcatenateState States: ConcatenateState: Type: Task Resource: Fn::GetAtt: [ConcatenateLambdaFunction, Arn] End: true End: true plugins: - serverless-step-functions package: exclude: - .git/** - node_modules - node_modules/** - __pycache__
ビジュアルワークフローとしては以下のような感じです。
なお、Fn::GetAtt: [MakeEveryMinuteLambdaFunction, Arn]
という表記ですが、
普通はFn::GetAtt: [MakeEveryMinute, Arn]
でも可能なはずですが、うまくデプロイできませんでした。
Step Functionsプラグインのマニュアルに下記の記載があったのでそれに従ったところ、
問題なくデプロイができました。
You can also express the above Fn::GetAtt function as Fn::GetAtt: [HelloLambdaFunction, Arn]
make_every_minute.py
def lambda_handler(event, context): date = get_date(event['time']) minutes = [] for hour in range(0, 24): for minute in range(0, 60): minutes.append("{}-{}".format(str(hour).zfill(2), str(minute).zfill(2))) minutes = map(lambda x: "{}-{}".format(date, x), minutes) return minutes def get_date(time): # 具体的な実装は省略(実際はここがかなり案件依存なので) return "2019-09-30"
concatenate.py
import os import boto3 SRC_BUCKET = os.environ.get("SOURCE_BUCKET") s3_client = None def lambda_handler(event, context): global s3_client if not s3_client: s3_client = boto3.client('s3') # event = "2019-09-30-00-00" prefix = event print("PREFIX: {}".format(prefix)) obj_list = s3_client.list_objects_v2(Bucket=SRC_BUCKET, Prefix=prefix) if 'Contents' not in obj_list: raise ValueError("Could not found any files.") for obj in obj_list['Contents']: key = obj['Key'] # 結合して云々な処理 concatnate_process(key)
実行
CloudWatch Eventsからのスケジュール実行を想定しているので、 入力jsonは最低限以下のような形が入っていればOKです。
{ "time": "2019-03-01T01:23:45Z" }
実際に実行してみたところ、約1時間ほどで処理が完了しました
(実際の結合処理の内容などを何も示していないので、絶対値には意味がありませんが)。
1440個という、1アカウント内のLambda関数の同時実行上限である1000を超える数のLambda関数を起動させましたが、
MaxConcurrency
を指定した以外は特に並列起動のことを考えている箇所はありません。
つまり、「引数を受け取った後の処理内容だけ実装すれば良い」 という理想的な実装スタイルで処理を書くことができています! 規模が大きくなると、こういう単純さは非常に重要です。
Mapステートへ渡すデータについての注意点
上記の例では配列に2019-09-30-00-00
のような文字列を渡しています。
Step FunctionsのParameters機能を使えば
{'date': '2019-09-30', 'minute': '00-00'}
のようにして、minute
だけを変動させることも可能なのですが、
どうやらMapステートとParametersの相性は良くないようです。
上記のようにしたところ、並列数が少ないうちは問題ないのですが、
並列数が大きくなった場合に処理に失敗してしまいました。
原因はまだはっきりしないのですが、 ひとまず単純な配列を受け渡しに使うと正常に動作するようなので、 Parametersを使ってうまくいかない場合には、 必要なデータを全て入れた配列を渡してあげるのが一番良さそうです。
ただ、このように配列の中身を大きくすると、 今度は、 受け渡しデータのサイズ上限 に引っかかってしまうという問題点もあります。 ですので、大きすぎるデータの場合にはまだ別のアーキテクチャを考える必要がありそうです。
この辺についてはキムさんのブログが非常に参考になりましたので、 そちらも是非ご参照ください。
まとめ
S3上の大量ファイルに対して、ある程度の個数でグルーピングするような処理を Step FunctionsのMapステートを使って実装してみました。 Mapステートの使い方は非常に直感的なので、 すでにStep Functionsを使っている人であればほぼ瞬時に使いこなせるのではないでしょうか?
1000を超えるような処理だとまだ課題もありそうなMapステートですが、 そこまで大きくない規模のファイル群に対しては、 直感的に、複雑なことを考えずにLambda処理を並列化させられるので非常にオススメです。 異なる引数についてLambda関数をばら撒きたいという場合にはまず最初に検討すべきパターンになるかと思います。
以上、誰かの参考になれば幸いです。
参照リンク
Amazon Web Services ブログ 新機能 – Step Functions が動的並列処理をサポート AWS Step Functions - Map